{
 "cells": [
  {
   "cell_type": "markdown",
   "source": [
    "# Name\n",
    "Data preparation by using a Flex template to submit a job to Cloud Dataflow\n",
    "\n",
    "# Labels\n",
    "GCP, Cloud Dataflow, Kubeflow, Pipeline\n",
    "\n",
    "# Summary\n",
    "A Kubeflow Pipeline component to prepare data by using a Flex template to submit a job to Cloud Dataflow.\n",
    "\n",
    "# Details\n",
    "\n",
    "## Intended use\n",
    "Use this component when you have a pre-built Cloud Dataflow Flex template and want to launch it as a step in a Kubeflow Pipeline.\n",
    "\n",
    "## Runtime arguments\n",
    "Argument        | Description                 | Optional   | Data type  | Accepted values | Default    |\n",
    ":---            | :----------                 | :----------| :----------| :----------     | :----------|\n",
    "project_id | The ID of the Google Cloud Platform (GCP) project to which the job belongs. | No | GCPProjectID |  |  |\n",
    "location | The regional endpoint to which the job request is directed.| No  |  GCPRegion |    |   |\n",
    "launch_parameters | The parameters that are required to launch the flex template. The schema is defined in [LaunchFlexTemplateParameters](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#LaunchFlexTemplateParameter) | None |\n",
    "staging_dir |  The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information. This is done so that you can resume the job in case of failure.|  Yes |  GCSPath |   |  None |\n",
    "wait_interval | The number of seconds to wait between calls to get the status of the job. |  Yes  | Integer  |   |  30 |\n",
    "\n",
    "## Input data schema\n",
    "\n",
    "The input `gcs_path` must contain a valid Cloud Dataflow template. The template can be created by following the instructions in [Creating Templates](https://cloud.google.com/dataflow/docs/guides/templates/creating-templates). You can also use [Google-provided templates](https://cloud.google.com/dataflow/docs/guides/templates/provided-templates).\n",
    "\n",
    "## Output\n",
    "Name | Description\n",
    ":--- | :----------\n",
    "job_id | The id of the Cloud Dataflow job that is created.\n",
    "\n",
    "\n",
    "## Detailed description\n",
    "This job use the google provided [BigQuery to Cloud Storage Parquet template](https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#running-the-bigquery-to-cloud-storage-parquet-template)\n",
    "to run a Dataflow pipeline that reads the [Shakespeare word index](https://cloud.google.com/bigquery/public-data#sample_tables)\n",
    "sample public dataset and writes the data in Parquet format to the user provided GCS bucket.\n",
    "\n",
    "## Caution & requirements\n",
    "To use the component, the following requirements must be met:\n",
    "- Cloud Dataflow API is enabled.\n",
    "- The component can authenticate to GCP. Refer to [Authenticating Pipelines to GCP](https://www.kubeflow.org/docs/gke/authentication-pipelines/) for details.\n",
    "- The output cloud storage bucket must exist before running the pipeline.\n",
    "- The Kubeflow user service account is a member of:\n",
    "    - `roles/dataflow.developer` role of the project.\n",
    "    - `roles/storage.objectCreator` role of the Cloud Storage Object for `staging_dir.` and output data folder.\n",
    "- The dataflow controller service account is a member of:\n",
    "    - `roles/bigquery.readSessionUser` role to create read sessions in the project.\n",
    "    - `roles/bigquery.jobUser` role to run jobs including queries.\n",
    "    - `roles/bigquery.dataViewer` role to read data and metadata from the table of view\n",
    "\n",
    "\n",
    "<br>"
   ],
   "metadata": {
    "collapsed": false,
    "pycharm": {
     "name": "#%% md\n"
    }
   }
  },
  {
   "cell_type": "markdown",
   "source": [
    "### Follow these steps to use the component in a pipeline:\n",
    "\n",
    "#### 1. Install the Kubeflow Pipeline SDK:"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "outputs": [],
   "source": [
    "%%capture --no-stderr\n",
    "\n",
    "!pip3 install kfp --upgrade"
   ],
   "metadata": {
    "collapsed": false,
    "pycharm": {
     "name": "#%%\n"
    }
   }
  },
  {
   "cell_type": "markdown",
   "source": [
    "#### 2. Load the component using KFP SDK"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "outputs": [],
   "source": [
    "import kfp.components as comp\n",
    "\n",
    "dataflow_template_op = comp.load_component_from_url(\n",
    "    'https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.2/components/gcp/dataflow/launch_flex_template/component.yaml')\n",
    "help(dataflow_template_op)"
   ],
   "metadata": {
    "collapsed": false,
    "pycharm": {
     "name": "#%%\n"
    }
   }
  },
  {
   "cell_type": "markdown",
   "source": [
    "#### 3. Configure job parameters"
   ],
   "metadata": {
    "collapsed": false,
    "pycharm": {
     "name": "#%% md\n"
    }
   }
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "outputs": [],
   "source": [
    "PROJECT_ID = '[Your PROJECT_ID]'\n",
    "BIGQUERY_TABLE_SPEC = '[Your PROJECT_ID:DATASET_ID.TABLE_ID]'\n",
    "GCS_OUTPUT_FOLDER = 'gs://[Your output GCS folder]'\n",
    "GCS_STAGING_FOLDER = 'gs://[Your staging GCS folder]'\n",
    "LOCATION = 'us'\n",
    "# Optional Parameters\n",
    "EXPERIMENT_NAME = 'Dataflow - Launch Flex Template'\n",
    "\n",
    "flex_temp_launch_parameters = {\n",
    "    \"parameters\": {\n",
    "        \"tableRef\": BIGQUERY_TABLE_SPEC,\n",
    "        \"bucket\": GCS_OUTPUT_FOLDER\n",
    "    },\n",
    "    \"containerSpecGcsPath\": \"gs://dataflow-templates/2021-03-29-00_RC00/flex/BigQuery_to_Parquet\",\n",
    "}"
   ],
   "metadata": {
    "collapsed": false,
    "pycharm": {
     "name": "#%%\n"
    }
   }
  },
  {
   "cell_type": "markdown",
   "source": [
    "#### 4. Example pipeline that uses the component"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "outputs": [],
   "source": [
    "import kfp.dsl as dsl\n",
    "import json\n",
    "@dsl.pipeline(\n",
    "    name='Dataflow launch flex template pipeline',\n",
    "    description='Dataflow launch flex template pipeline'\n",
    ")\n",
    "def pipeline(\n",
    "    project_id = PROJECT_ID,\n",
    "    location = LOCATION,\n",
    "    launch_parameters = json.dumps(flex_temp_launch_parameters),\n",
    "    staging_dir = GCS_STAGING_FOLDER,\n",
    "    wait_interval = 30):\n",
    "    dataflow_template_op(\n",
    "        project_id = project_id,\n",
    "        location = location,\n",
    "        launch_parameters = launch_parameters,\n",
    "        staging_dir = staging_dir,\n",
    "        wait_interval = wait_interval)"
   ],
   "metadata": {
    "collapsed": false,
    "pycharm": {
     "name": "#%%\n"
    }
   }
  },
  {
   "cell_type": "markdown",
   "source": [
    "#### 5. Create pipeline run"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "outputs": [],
   "source": [
    "import kfp\n",
    "\n",
    "pipeline_func = pipeline\n",
    "run_name = pipeline_func.__name__ + ' run'\n",
    "\n",
    "kfp.Client().create_run_from_pipeline_func(\n",
    "    pipeline_func,\n",
    "    arguments = {},\n",
    "    run_name = run_name,\n",
    "    experiment_name=EXPERIMENT_NAME,\n",
    "    namespace='default'\n",
    ")"
   ],
   "metadata": {
    "collapsed": false,
    "pycharm": {
     "name": "#%%\n"
    }
   }
  },
  {
   "cell_type": "markdown",
   "source": [
    "#### 6. Inspect the output"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "outputs": [],
   "source": [
    "!gsutil cat $GCS_OUTPUT_FOLDER*"
   ],
   "metadata": {
    "collapsed": false,
    "pycharm": {
     "name": "#%%\n"
    }
   }
  },
  {
   "cell_type": "markdown",
   "source": [
    "## References\n",
    "\n",
    "* [Component python code](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/component_sdk/python/kfp_component/google/dataflow/_launch_flex_template.py)\n",
    "* [Component docker file](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/Dockerfile)\n",
    "* [Sample notebook](https://github.com/kubeflow/pipelines/blob/master/components/gcp/dataflow/launch_flex_template/sample.ipynb)\n",
    "* [Cloud Dataflow Templates overview](https://cloud.google.com/dataflow/docs/guides/templates/overview)\n",
    "\n",
    "## License\n",
    "By deploying or using this software you agree to comply with the [AI Hub Terms of Service](https://aihub.cloud.google.com/u/0/aihub-tos) and the [Google APIs Terms of Service](https://developers.google.com/terms/). To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.\n"
   ],
   "metadata": {
    "collapsed": false
   }
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}